#!/usr/bin/python3 -cimport os, sys; os.execv(os.path.dirname(sys.argv[1]) + "/../common/pywrap", sys.argv)

# This file is part of Cockpit.
#
# Copyright (C) 2015 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <https://www.gnu.org/licenses/>.

import testlib

FP_SHA256 = "SHA256:iyVAl4Z8riL9Jg4fV9Wv/6cbqebdDtsBEMkojNLLYX8"
KEY = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIEAkRTvQCSEZNPXpA5bP82ilQn3TMeQ6z2NO3O0UwY9z test-name"


@testlib.nondestructive
class TestKeys(testlib.MachineCase):
    def testAuthorizedKeys(self):
        m = self.machine
        b = self.browser

        # Create a user without any role
        m.execute("useradd user -s /bin/bash -m -c User")
        m.execute("echo user:foobar | chpasswd")

        def login(user):
            self.login_and_go("/users#/" + user, user=user, superuser=False)
            b.wait_text("#account-user-name", user)

        def add_key(key, fp_sh256, comment):
            b.click('#authorized-key-add')
            b.wait_visible("#add-authorized-key-dialog")
            b.wait_val("#authorized-keys-text", "")
            b.set_input_text("#authorized-keys-text", key)
            b.click("#add-authorized-key-dialog button.apply")
            b.wait_not_present("#add-authorized-key-dialog")

            b.wait_in_text("#account-authorized-keys-list", comment)

            b.wait_not_in_text("#account-authorized-keys-list", "no authorized public keys")
            text = b.text("#account-authorized-keys-list")
            self.assertIn(fp_sh256, text)

        # no keys
        login("user")
        b.wait_in_text("#account-authorized-keys", "no authorized public keys")

        # add bad
        b.click('#authorized-key-add')
        b.wait_visible("#add-authorized-key-dialog")
        b.wait_val("#authorized-keys-text", "")
        b.set_input_text("#authorized-keys-text", "bad")
        b.click("#add-authorized-key-dialog button.apply")
        b.wait_in_text("#add-authorized-key-dialog", "The key you provided was not valid")
        b.click("#add-authorized-key-dialog button.cancel")

        # add good
        add_key(KEY, FP_SHA256, "test-name")

        # remove key on mobile, on mobile we show a kebab menu
        b.set_layout("mobile")
        b.click("#account-authorized-keys button.pf-v6-c-menu-toggle")
        b.click("button.pf-v6-c-menu__item")
        b.wait_count("#account-authorized-keys-list tr", 1)
        b.set_layout("desktop")

        # add good
        add_key(KEY, FP_SHA256, "test-name")

        # Try see admin
        b.go("#/admin")
        b.wait_text("#account-user-name", "admin")

        # Not allowed, except on Ubuntu, where we can find out that ~/.ssh doesn't exist, which is shown as "no keys".
        if "ubuntu" not in m.image and "debian" not in m.image:
            b.wait_in_text("#account-authorized-keys", "You do not have permission")

        b.logout()

        # delete whole ssh to start fresh
        m.execute("rm -rf /home/user/.ssh")
        self.assertNotIn(".ssh", m.execute("ls /home/user"))

        # Log in as admin
        login("admin")
        b.go("#/user")

        if "ubuntu" not in m.image and "debian" not in m.image:
            b.wait_in_text("#account-authorized-keys",
                           "You do not have permission to view the authorized public keys for this account.")

        b.become_superuser()

        b.wait_in_text("#account-authorized-keys", "no authorized public keys")

        def check_perms():
            perms = m.execute("getfacl -a /home/user/.ssh")
            self.assertIn("owner: user", perms)

            perms = m.execute("getfacl -a /home/user/.ssh/authorized_keys")
            self.assertIn("owner: user", perms)
            self.assertIn("user::rw-", perms)
            self.assertIn("group::---", perms)
            self.assertIn("other::---", perms)

        # Adding keys sets permissions properly
        b.wait_text("#account-user-name", "user")
        add_key(KEY, FP_SHA256, "test-name")
        check_perms()

        b.wait_count("#account-authorized-keys-list tr", 1)

        # Add invalid key directly
        m.write("/home/user/.ssh/authorized_keys", "\nbad\n", append=True)
        b.wait_in_text("#account-authorized-keys-list tbody:last-child", "Invalid key")
        b.wait_count("#account-authorized-keys-list tr", 2)

        # Removing the key
        b.click("#account-authorized-keys-list tbody:last-child button")
        b.wait_not_in_text("#account-authorized-keys", "Invalid key")
        b.wait_count("#account-authorized-keys-list tr", 1)
        data = m.execute("cat /home/user/.ssh/authorized_keys")
        self.assertEqual(data, KEY + "\n")
        # Permissions are still ok
        check_perms()
        b.logout()

        # User can still see their keys
        login("user")
        b.wait_in_text("#account-authorized-keys-list tbody:first-child", "test-name")

        b.click("#account-authorized-keys-list tbody:first-child button")
        b.wait_in_text("#account-authorized-keys", "no authorized public keys")

        self.allow_journal_messages('authorized_keys is not a public key file.')
        self.allow_journal_messages('Missing callback called fullpath = /home/user/.ssh/authorized_keys')
        self.allow_journal_messages('')

    # Possible workaround - ssh as `admin` and just do `m.execute()`
    @testlib.skipBrowser("Firefox cannot do `cockpit.spawn`", "firefox")
    def testPrivateKeys(self):
        b = self.browser
        m = self.machine

        def list_keys():
            return b.eval_js("cockpit.spawn([ '/bin/sh', '-c', 'ssh-add -l || true' ])")

        def toggleExpandedStateKey(identifier):
            b.click(f"tr[data-name='{identifier}'] .pf-v6-c-table__toggle > button")

        def waitKeyPresent(identifier, present):
            if present:
                b.wait_visible(f"tr[data-name='{identifier}']")
            else:
                b.wait_not_present(f"tr[data-name='{identifier}']")

        def waitKeyLoaded(identifier, enabled):
            b.wait_visible(f"tr[data-name='{identifier}'] input[type=checkbox]" + (":checked" if enabled else ":not(checked)"))
            b.wait_visible(f"tr[data-name='{identifier}'][data-loaded={'true' if enabled else 'false'}]")

        def toggleKeyState(identifier):
            b.click(f"tr[data-name='{identifier}'] .pf-v6-c-switch__input")

        def selectTab(identifier, title):
            b.click(f"tr[data-name='{identifier}'] + tr li > button:contains('{title}')")

        def waitTabActive(identifier, title):
            b.wait_visible(f"tr[data-name='{identifier}'] + tr li.pf-m-current > button:contains('{title}')")

        def waitKeyRowExpanded(identifier, expanded):
            if expanded:
                b.wait_visible(f"tr[data-name='{identifier}'] + tr .ct-listing-panel-body:not([hidden])")
            else:
                b.wait_not_visible(f"tr[data-name='{identifier}'] + tr")

        def waitKeyDetail(identifier, dtype, value):
            b.wait_in_text(f"tr[data-name='{identifier}'] + tr dt:contains({dtype}) + dd > div", value)

        def getKeyDetail(identifier, dtype):
            return b.text(f"tr[data-name='{identifier}'] + tr dt:contains({dtype}) + dd > div")

        # Operating systems where auto loading doesn't work
        auto_load = not m.ws_container

        if m.image == "arch":
            self.write_file("/etc/pam.d/cockpit", """
auth       optional     pam_ssh_add.so
session    optional     pam_ssh_add.so
""", append=True)

        self.restore_dir("/home/admin")

        # Put all the keys in place
        m.execute("mkdir -p /home/admin/.ssh")
        m.upload([
            "verify/files/ssh/id_rsa",
            "verify/files/ssh/id_rsa.pub",
            "verify/files/ssh/id_ed25519",
            "verify/files/ssh/id_ed25519.pub"
        ], "/home/admin/.ssh/")
        m.execute("chmod 600 /home/admin/.ssh/id_*")
        m.execute("chown -R admin:admin /home/admin/.ssh")

        self.login_and_go()

        id_rsa = "2048 SHA256:SRvBhCmkCEVnJ6ascVH0AoVEbS3nPbowZkNevJnXtgw"
        id_ed25519 = "256 SHA256:Wd028KYmG3OVLp7dBmdx0gMR7VcarJVIfaTtKqYCmak"

        keys = list_keys()
        if auto_load:
            self.assertIn(id_rsa, keys)
        self.assertNotIn(id_ed25519, keys)

        b.open_session_menu()
        b.click("#sshkeys")

        # Automatically loaded
        waitKeyLoaded('id_rsa', enabled=auto_load)
        toggleExpandedStateKey('id_rsa')
        waitKeyRowExpanded('id_rsa', expanded=True)
        waitTabActive('id_rsa', 'Details')
        waitKeyDetail('id_rsa', 'Comment', "test@test")
        waitKeyDetail('id_rsa', 'Type', "RSA")
        text = b.text("tr[data-name=id_rsa] + tr dt:contains(Fingerprint) + dd > div")
        self.assertEqual(text, id_rsa[5:])

        selectTab('id_rsa', 'Public key')
        b.wait_val("tr[data-name=id_rsa] + tr .pf-v6-c-clipboard-copy input",
                   "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDG4iipTovcMg0xn+089QLNKVGpP"
                   "2Pgq2duxHgAXre2XgA3dZL+kooioGFwBQSEjbWssKy82hKIN/W82/lQtL6krf7JQW"
                   "nT3LZwD5DPsvHFKhOLghbiFzSI0uEL4NFFcZOMo5tGLrM5LsZsaIkkv5QkAE0tHIy"
                   "eYinK6dQ2d8ZsfmgqxHDUQUWnz1T75X9fWQsUugSWI+8xAe0cfa4qZRz/IC+K7DEB"
                   "3x4Ot5pl8FBuydJj/gb+Lwo2Vs27/d87W/0KHCqOHNwaVC8RBb1WcmXRDDetLGH1A"
                   "9m5x7Ip/KU/cyvWWxw8S4VKZkTIcrGUhFYJDnjtE3Axz+D7agtps41t test@test")
        toggleExpandedStateKey('id_rsa')
        waitKeyRowExpanded('id_rsa', expanded=False)

        # Load the id_ed25519 key
        waitKeyLoaded('id_ed25519', enabled=False)
        toggleKeyState('id_ed25519')
        b.set_input_text("#id_ed25519-password", "locked")
        b.click("#id_ed25519-unlock")
        waitKeyLoaded('id_ed25519', enabled=True)

        # Both keys are now loaded
        keys = list_keys()
        if auto_load:
            self.assertIn(id_rsa, keys)
        self.assertIn(id_ed25519, keys)

        # Unload the RSA key
        if auto_load:
            toggleKeyState('id_rsa')
            waitKeyLoaded('id_rsa', enabled=False)

        # Only DSA keys now loaded
        keys = list_keys()
        self.assertIn(id_ed25519, keys)
        self.assertNotIn(id_rsa, keys)

        # Change password of DSA key
        toggleExpandedStateKey('id_ed25519')
        selectTab('id_ed25519', 'Password')
        b.set_input_text("#id_ed25519-old-password", "locked")
        b.set_input_text("#id_ed25519-new-password", "foobar")
        b.set_input_text("#id_ed25519-confirm-password", "foobar")
        b.assert_pixels("#credentials-modal", "ssh-keys-dialog", chrome_hack_double_shots=True)

        b.click("#id_ed25519-change-password")
        b.wait_visible('#credentials-modal .pf-v6-c-helper-text__item.pf-m-success')

        # Log off and log back in, and we should have both loaded automatically
        if auto_load:
            b.click("#credentials-modal button[aria-label=Close]")
            b.wait_not_present("#credentials-modal")
            b.logout()
            b.login_and_go()
            keys = list_keys()
            self.assertIn(id_rsa, keys)
            self.assertIn(id_ed25519, keys)

            b.open_session_menu()
            b.click("#sshkeys")
            b.wait_visible("#credentials-modal")

        # Add bad keys
        # generate a new key
        m.execute("ssh-keygen -t rsa -N '' -f /tmp/new.rsa")
        self.addCleanup(m.execute, "rm -f /tmp/new.rsa*")
        m.execute("chown admin:admin /tmp/new.rsa")
        new_pk = m.execute("cat /tmp/new.rsa.pub").strip().split()[0]
        m.execute("rm /tmp/new.rsa.pub")

        waitKeyPresent('id_rsa', present=True)

        b.wait_visible("#credential-keys")
        b.wait_not_present("#ssh-file-add")
        b.click("#ssh-file-add-custom")
        b.set_file_autocomplete_val("#ssh-file-add-key", "/etc/")
        b.click("#ssh-file-add")
        b.wait_text("#credentials-modal .pf-m-error > .pf-v6-c-helper-text__item-text", "Not a valid private key")

        b.set_input_text("#credentials-modal .pf-v6-c-menu-toggle input", "/var/test/")
        b.wait_in_text(".pf-v6-c-menu__list-item.pf-m-aria-disabled", "No such file or directory")
        b.focus("#credentials-modal .pf-v6-c-menu-toggle input")
        b.key("Backspace", 5)
        b.wait_visible(".pf-v6-c-menu__list-item.directory:contains('/var/lib/')")
        b.click("#credentials-modal .pf-v6-c-menu-toggle button[aria-label='Clear input value']")
        b.wait_val("#credentials-modal .pf-v6-c-menu-toggle input", "")

        b.set_file_autocomplete_val("#ssh-file-add-key", "/tmp/new.rsa")
        b.click("#ssh-file-add")
        b.wait_not_present("#ssh-file-add")
        # OpenSSH 7.8 and up has a new default key format where
        # keys are marked as "agent_only", thereby limiting functionality
        keys = list_keys()
        keys_length = 3 if auto_load else 2
        # Keys are like: 256 SHA256:x6S6fxMuEyqhpwNRAIK7ms6bZDY6xK9wzdDr2kCaWVY id_ed25519 (ED25519)
        # We need the id_ed25519 part, (name or comment)
        new_key = keys.splitlines()[keys_length - 1].split(' ')[2]
        toggleKeyState(new_key)

        toggleExpandedStateKey(new_key)
        waitKeyRowExpanded(new_key, expanded=True)
        waitTabActive(new_key, 'Details')
        waitKeyDetail(new_key, 'Type', 'RSA')
        self.assertNotEqual(getKeyDetail(new_key, 'Fingerprint'), "")
        selectTab(new_key, 'Public key')
        self.assertTrue(b.val(f"tr[data-name='{new_key}'] + tr .pf-v6-c-clipboard-copy input").startswith(new_pk))

        # OpenSSH 7.8 and up has a new default key format where
        # keys are marked as "agent_only", thereby limiting functionality
        # "agent_only" keys cannot be turned off, or have their passwords changed
        waitKeyLoaded(new_key, enabled=True)

        # Test adding key with passphrase
        m.execute("ssh-keygen -t rsa -N 'foobar' -f /tmp/new_with_passphrase.rsa")
        self.addCleanup(m.execute, "rm -f /tmp/new_with_passphrase.rsa*")
        m.execute("chown admin:admin /tmp/new_with_passphrase.rsa")
        m.execute("rm /tmp/new_with_passphrase.rsa.pub")

        b.wait_not_present("#ssh-file-add")
        b.click("#ssh-file-add-custom")
        b.set_file_autocomplete_val("#ssh-file-add-key", "/tmp/new_with_passphrase.rsa")
        b.click("#ssh-file-add")
        b.wait_visible("h1:contains('Unlock key /tmp/new_with_passphrase.rsa')")
        b.set_input_text("#\\/tmp\\/new_with_passphrase\\.rsa-password", "foobar")
        b.click("button:contains('Unlock')")
        b.wait_not_present("h1:contains('Unlock key /tmp/new_with_passphrase.rsa')")
        b.wait_not_present("#ssh-file-add")
        b.wait_count("#credential-keys tbody", 4)

        b.click("#credentials-modal button[aria-label=Close]")
        b.wait_not_present("#credentials-modal")


if __name__ == '__main__':
    testlib.test_main()
